本篇是實作常用的 AWS MSK 服務之 Terraform 模組,完整的專案程式碼分享在我的 Github 上。
my_msk
的放置位置 modules/my_msk
:├── configs
│ ├── iam
│ │ ├── assume_role_policies
│ │ ├── policies
│ │ ├── role_policies
│ │ ├── user_policies
│ │ └── iam.yaml
│ ├── s3
│ │ ├── policies
│ │ └── s3.yaml
│ ├── subnet
│ │ └── my-subnets.yaml
│ └── vpc
│ └── my-vpcs.yaml
├── example.tfvars
├── locals.tf
├── main.tf
├── modules
│ ├── my_cloudfront
│ ├── my_cloudwatch
│ ├── my_eips
│ ├── my_eventbridge
│ ├── my_iam
│ ├── my_igw
│ ├── my_instances
│ ├── my_kinesis_stream
│ ├── my_kms
│ ├── my_msk
│ │ ├── cluster.tf
│ │ ├── configuration.tf
│ │ ├── outputs.tf
│ │ ├── provider.tf
│ │ ├── scram-secret-association.tf
│ │ ├── storage_autoscale.tf
│ │ └── variables.tf
│ ├── my_nacls
│ ├── my_route_tables
│ ├── my_s3
│ ├── my_subnets
│ └── my_vpc
└── variables.tf
my_msk
模組:./modules/my_msk/outputs.tf
:output "kafka_bootstrap_brokers_sasl_scram" {
value = aws_msk_cluster.my_kafka.bootstrap_brokers_sasl_scram
}
./modules/my_msk/provider.tf
:provider "aws" {
region = var.aws_region
profile = var.aws_profile
}
./modules/my_msk/variables.tf
:variable "aws_region" {
description = "AWS region"
default = "ap-northeast-1"
}
variable "aws_profile" {
description = "AWS profile"
default = ""
}
variable "project_name" {
type = string
description = "Project name"
default = ""
}
variable "department_name" {
type = string
description = "Department name"
default = "SRE"
}
variable "msk_users" {
type = list(any)
}
variable "server_properties" {
type = string
}
variable "kafka_cluster_name" {
type = string
}
variable "kafka_version" {
type = string
}
variable "kafka_number_of_broker_nodes" {
type = string
}
variable "kafka_instance_type" {
type = string
}
variable "kafka_ebs_volume_size" {
type = number
}
variable "kafka_client_subnets" {
type = list(any)
}
variable "kafka_security_groups" {
type = list(any)
}
variable "kafka_scaling_max_capacity" {
type = number
}
variable "kafka_log_group_name" {
type = string
default = "/nxd/kafka"
}
./modules/my_msk/cluster.tf
:resource "aws_msk_cluster" "kafka" {
cluster_name = var.kafka_cluster_name
kafka_version = var.kafka_version
number_of_broker_nodes = var.kafka_number_of_broker_nodes
broker_node_group_info {
instance_type = var.kafka_instance_type
client_subnets = var.kafka_client_subnets
security_groups = var.kafka_security_groups
storage_info {
ebs_storage_info {
provisioned_throughput {
enabled = false
}
volume_size = var.kafka_ebs_volume_size
}
}
}
client_authentication {
sasl {
scram = true
}
}
configuration_info {
arn = aws_msk_configuration.kafka_config_general.arn
revision = aws_msk_configuration.kafka_config_general.latest_revision
}
open_monitoring {
prometheus {
jmx_exporter {
enabled_in_broker = true
}
node_exporter {
enabled_in_broker = true
}
}
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = var.kafka_log_group_name
}
}
}
tags = {
type = "kafkaMSK"
}
depends_on = [
aws_msk_configuration.kafka_config_general
]
}
./modules/my_msk/configuration.tf
:resource "aws_msk_configuration" "kafka_config_general" {
kafka_versions = ["${var.kafka_version}"]
name = "kafka-config-general"
server_properties = var.server_properties
}
./modules/my_msk/scram-secret-association.tf
:
resource "aws_secretsmanager_secret" "kafka_secrets" {
for_each = { for user in var.msk_users : user.username => user }
name = "AmazonMSK_${each.value.username}"
kms_key_id = aws_kms_key.kafka_key.key_id
}
resource "aws_kms_key" "kafka_key" {
description = "Example Key for MSK Cluster Scram Secret Association"
}
resource "aws_secretsmanager_secret_version" "kafka_secrets" {
for_each = { for user in var.msk_users : user.username => user }
secret_id = aws_secretsmanager_secret.kafka_secrets["${each.value.username}"].id
secret_string = jsonencode({ username : "${each.value.username}", password : "${each.value.password}" })
depends_on = [
aws_secretsmanager_secret.kafka_secrets
]
}
resource "aws_secretsmanager_secret_policy" "kafka_secrets" {
for_each = { for user in var.msk_users : user.username => user }
secret_arn = aws_secretsmanager_secret.kafka_secrets["${each.value.username}"].arn
policy = <<POLICY
{
"Version" : "2012-10-17",
"Statement" : [ {
"Sid": "AWSKafkaResourcePolicy",
"Effect" : "Allow",
"Principal" : {
"Service" : "kafka.amazonaws.com"
},
"Action" : "secretsmanager:getSecretValue",
"Resource" : "${aws_secretsmanager_secret.kafka_secrets["${each.value.username}"].arn}"
} ]
}
POLICY
depends_on = [
aws_secretsmanager_secret.kafka_secrets
]
}
resource "aws_msk_scram_secret_association" "kafka" {
cluster_arn = aws_msk_cluster.kafka.arn
secret_arn_list = flatten([
for user in var.msk_users :
"${aws_secretsmanager_secret.kafka_secrets["${user.username}"].arn}"
])
depends_on = [aws_secretsmanager_secret_version.kafka_secrets]
}
./modules/my_msk/storage_autoscale.tf
:resource "aws_appautoscaling_target" "kafka_storage" {
max_capacity = var.kafka_scaling_max_capacity
min_capacity = 1
resource_id = aws_msk_cluster.kafka.arn
scalable_dimension = "kafka:broker-storage:VolumeSize"
service_namespace = "kafka"
}
resource "aws_appautoscaling_policy" "kafka_scaling_policy" {
name = "nxd-kafka-broker-scaling"
policy_type = "TargetTrackingScaling"
resource_id = aws_msk_cluster.kafka.arn
scalable_dimension = aws_appautoscaling_target.kafka_storage.scalable_dimension
service_namespace = aws_appautoscaling_target.kafka_storage.service_namespace
target_tracking_scaling_policy_configuration {
predefined_metric_specification {
predefined_metric_type = "KafkaBrokerStorageUtilization"
}
target_value = 80
}
}
example.tfvars
:aws_region="ap-northeast-1"
aws_profile="<YOUR_PROFILE>"
project_name="example"
department_name="SRE"
main.tf
:terraform {
required_providers {
aws = {
version = "5.15.0"
}
}
backend "s3" {
bucket = "<YOUR_S3_BUCKET_NAME>"
dynamodb_table = "<YOUR_DYNAMODB_TABLE_NAME>"
key = "terraform.tfstate"
region = "ap-northeast-1"
shared_credentials_file = "~/.aws/config"
profile = "<YOUR_PROFILE>"
}
}
其他模組省略...
# msk
module "kafka" {
aws_profile = var.aws_profile
aws_region = var.aws_region
kafka_cluster_name = "my-kafka"
kafka_version = "2.8.1"
kafka_number_of_broker_nodes = 2
kafka_instance_type = "kafka.t3.small"
kafka_ebs_volume_size = 600
kafka_scaling_max_capacity = 500
kafka_client_subnets = [
module.subnet.subnets["my-application-ap-northeast-1a"].id,
module.subnet.subnets["my-application-ap-northeast-1c"].id,
]
kafka_security_groups = ["sg-0d7ae01cd3dc3a16d"]
server_properties = <<PROPERTIES
auto.create.topics.enable=false
default.replication.factor=2
min.insync.replicas=1
num.io.threads=8
num.network.threads=5
num.partitions=10
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
PROPERTIES
msk_users = [
{
username = "msk-user"
password = "kDzq86Y03QZ0"
}
]
source = "./modules/nxd_msk"
}
terraform init && terraform plan --out .plan -var-file=example.tfvars
來確認一下結果:
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
+ create
Terraform will perform the following actions:
# module.msk.aws_appautoscaling_policy.kafka_scaling_policy will be created
+ resource "aws_appautoscaling_policy" "kafka_scaling_policy" {
+ alarm_arns = (known after apply)
+ arn = (known after apply)
+ id = (known after apply)
+ name = "nxd-kafka-broker-scaling"
+ policy_type = "TargetTrackingScaling"
+ resource_id = (known after apply)
+ scalable_dimension = "kafka:broker-storage:VolumeSize"
+ service_namespace = "kafka"
+ target_tracking_scaling_policy_configuration {
+ disable_scale_in = false
+ target_value = 80
+ predefined_metric_specification {
+ predefined_metric_type = "KafkaBrokerStorageUtilization"
}
}
}
# module.msk.aws_appautoscaling_target.kafka_storage will be created
+ resource "aws_appautoscaling_target" "kafka_storage" {
+ arn = (known after apply)
+ id = (known after apply)
+ max_capacity = 500
+ min_capacity = 1
+ resource_id = (known after apply)
+ role_arn = (known after apply)
+ scalable_dimension = "kafka:broker-storage:VolumeSize"
+ service_namespace = "kafka"
+ tags_all = (known after apply)
}
# module.msk.aws_kms_key.kafka_key will be created
+ resource "aws_kms_key" "kafka_key" {
+ arn = (known after apply)
+ bypass_policy_lockout_safety_check = false
+ customer_master_key_spec = "SYMMETRIC_DEFAULT"
+ description = "Example Key for MSK Cluster Scram Secret Association"
+ enable_key_rotation = false
+ id = (known after apply)
+ is_enabled = true
+ key_id = (known after apply)
+ key_usage = "ENCRYPT_DECRYPT"
+ multi_region = (known after apply)
+ policy = (known after apply)
+ tags_all = (known after apply)
}
# module.msk.aws_msk_cluster.kafka will be created
+ resource "aws_msk_cluster" "kafka" {
+ arn = (known after apply)
+ bootstrap_brokers = (known after apply)
+ bootstrap_brokers_public_sasl_iam = (known after apply)
+ bootstrap_brokers_public_sasl_scram = (known after apply)
+ bootstrap_brokers_public_tls = (known after apply)
+ bootstrap_brokers_sasl_iam = (known after apply)
+ bootstrap_brokers_sasl_scram = (known after apply)
+ bootstrap_brokers_tls = (known after apply)
+ bootstrap_brokers_vpc_connectivity_sasl_iam = (known after apply)
+ bootstrap_brokers_vpc_connectivity_sasl_scram = (known after apply)
+ bootstrap_brokers_vpc_connectivity_tls = (known after apply)
+ cluster_name = "my-kafka"
+ current_version = (known after apply)
+ enhanced_monitoring = "DEFAULT"
+ id = (known after apply)
+ kafka_version = "2.8.1"
+ number_of_broker_nodes = 2
+ storage_mode = (known after apply)
+ tags = {
+ "type" = "kafkaMSK"
}
+ tags_all = {
+ "type" = "kafkaMSK"
}
+ zookeeper_connect_string = (known after apply)
+ zookeeper_connect_string_tls = (known after apply)
+ broker_node_group_info {
+ az_distribution = "DEFAULT"
+ client_subnets = [
+ "subnet-068ec8e9ec1f4ed8b",
+ "subnet-0a7ba0f71e5500d41",
]
+ instance_type = "kafka.t3.small"
+ security_groups = [
+ "sg-0d7ae01cd3dc3a16d",
]
+ storage_info {
+ ebs_storage_info {
+ volume_size = 600
+ provisioned_throughput {
+ enabled = false
}
}
}
}
+ client_authentication {
+ sasl {
+ scram = true
}
}
+ configuration_info {
+ arn = (known after apply)
+ revision = (known after apply)
}
+ logging_info {
+ broker_logs {
+ cloudwatch_logs {
+ enabled = true
+ log_group = "/nxd/kafka"
}
}
}
+ open_monitoring {
+ prometheus {
+ jmx_exporter {
+ enabled_in_broker = true
}
+ node_exporter {
+ enabled_in_broker = true
}
}
}
}
# module.msk.aws_msk_configuration.kafka_config_general will be created
+ resource "aws_msk_configuration" "kafka_config_general" {
+ arn = (known after apply)
+ id = (known after apply)
+ kafka_versions = [
+ "2.8.1",
]
+ latest_revision = (known after apply)
+ name = "kafka-config-general"
+ server_properties = <<-EOT
auto.create.topics.enable=false
default.replication.factor=2
min.insync.replicas=1
num.io.threads=8
num.network.threads=5
num.partitions=10
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
EOT
}
# module.msk.aws_msk_scram_secret_association.kafka will be created
+ resource "aws_msk_scram_secret_association" "kafka" {
+ cluster_arn = (known after apply)
+ id = (known after apply)
+ secret_arn_list = (known after apply)
}
# module.msk.aws_secretsmanager_secret.kafka_secrets["msk-user"] will be created
+ resource "aws_secretsmanager_secret" "kafka_secrets" {
+ arn = (known after apply)
+ force_overwrite_replica_secret = false
+ id = (known after apply)
+ kms_key_id = (known after apply)
+ name = "AmazonMSK_msk-user"
+ name_prefix = (known after apply)
+ policy = (known after apply)
+ recovery_window_in_days = 30
+ tags_all = (known after apply)
}
# module.msk.aws_secretsmanager_secret_policy.kafka_secrets["msk-user"] will be created
+ resource "aws_secretsmanager_secret_policy" "kafka_secrets" {
+ id = (known after apply)
+ policy = (known after apply)
+ secret_arn = (known after apply)
}
# module.msk.aws_secretsmanager_secret_version.kafka_secrets["msk-user"] will be created
+ resource "aws_secretsmanager_secret_version" "kafka_secrets" {
+ arn = (known after apply)
+ id = (known after apply)
+ secret_id = (known after apply)
+ secret_string = (sensitive value)
+ version_id = (known after apply)
+ version_stages = (known after apply)
}
Plan: 9 to add, 0 to change, 0 to destroy.
───────────────────────────────────────────────────────────────────────────────────
Saved the plan to: .plan
To perform exactly these actions, run the following command to apply:
terraform apply ".plan"
Releasing state lock. This may take a few moments...
下一篇文章將會展示實作 Using Packer to Create an AMI 以建置 Cassandra Cluster 為例子。